package com.jokey.listener;

import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;

/**
 * Flink CDC 数据处理
 *
 * @author jokey
 * @since 2024-3-14
 */
@Slf4j
@Component
public class CustomSink extends RichSinkFunction<String> {

    @Override
    public void invoke(String value, Context context) {
        log.info("收到变更原始数据:{}", value);

        String op = JSON.parseObject(value).getString("op");
        if ("c".equals(op)) {
            log.info("新增了一条数据 ...");
        } else if ("u".equals(op)) {
            log.info("更新了一条数据 ...");
        } else if ("d".equals(op)) {
            log.info("删除了一条数据 ...");
        } else {
            log.error("{} 未知的操作类型！", op);
        }
    }

}
